#YashanDB Kafka Connector
Debezium是专门为Kafka实现的CDC连接器,Debezium Connector YashanDB连接器同步全量快照数据,捕获并记录YashanDB数据库中发生的行级更改,包含连接器运行时添加的表。您可以配置YashanDB连接器,使其为特定的schema和表捕获更改事件,将更改事件同步到Kafka。
# 版本配套说明
Connector Version | YashanDB Version | YashanDB Jdbc Version | Debezium Version | Kafka Version | Java Version |
---|---|---|---|---|---|
2.4.2.0 | 支持YashanDB YStream的YashanDB版本 | 支持YashanDB YStream的Jdbc 版本 | 2.4.2.Final | 2.x,3.x | 11+ |
# 前提条件
已安装Apache Zookeeper、Apache Kafka、 kafka Connect ,具体操作请查阅Apache Kafka官方文档 (opens new window)。
在kafka安装目录下创建
plugins/debezium-connector-yashandb
目录,并获取下列jar包并存放至该新建目录:可自行使用构建自动化工具(例如Maven)将项目构建打包,也可联系我们的技术支持获取。
debezium-connector-yashandb-2.4.2.jar
YStream-vxx.xx.xx.jar
yashandb-jdbc.jar
# 使用限制
数据类型不能为XMLTYPE、JSON以及自定义数据类型。
# 配置YashanDB
# 步骤1:配置Ystream内存池
增量数据依赖YashanDB的YStream实时获取YashanDB的已提交数据。
当您使用YashanDB作为源使用含增量同步的任务时,您需要在YashanDB中为YStream分配内存池:
ALTER SYSTEM SET STREAM_POOL_SIZE = streamPoolSize;
# 步骤2:开启附加日志
读取增量数据变更需要开启附加日志,可按需开启库级或表级附加日志。
Caution:
不开启附加日志或开启附加日志的对象不正确会导致数据丢失甚至任务失败。
当您需要监听库下全部对象时(包含新增对象),可开全库附加日志,方式如下:
ALTER DATABASE ADD SUPPLEMENTAL LOG TABLE TYPE (HEAP);
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA ( PRIMARY KEY) COLUMNS;
当您仅需要监听某些表时,可开启表级附加日志,方式如下:
ALTER TABLE tablename ADD SUPPLEMENTAL LOG DATA ( PRIMARY KEY ) COLUMNS;
Caution:
不开启附加日志或开启附加日志的对象不正确会导致数据丢失甚至任务失败。
# 步骤3:配置YStream服务
YStream服务的详细配置教程请查阅DBMS_YSTREAM_ADM高级包。
调用DBMS_YSTREAM_ADM.CREATE函数创建YStream服务:
EXEC DBMS_YSTREAM_ADM.CREATE('serverName', 'connect_user', start_scn)
Copied!start_scn可通过查询select CURRENT_SCN from V$DATABASE获取。
调用DBMS_YSTREAM_ADM.ADD_TABLES函数为YStream服务新增解析表名和模式(即connector需要捕获的表名和模式)。
按需调用DBMS_YSTREAM_ADM.SET_PARAMETER函数为YStream服务设置相关参数。
调用DBMS_YSTREAM_ADM.START函数启动YStream服务:
DBMS_YSTREAM_ADM.START(server_name IN VARCHAR(64));
Copied!
# 步骤4:授予相关用户权限
连接用户需要相关权限,请授予如下权限保障任务能够正常运行:
GRANT CREATE SESSION TO username;
GRANT SELECT ON V_$DATABASE TO username;
GRANT SELECT ON V_$TRANSACTION TO username;
GRANT SELECT ON V_$YSTREAM_SERVER TO username;
GRANT FLASHBACK ANY TABLE TO username;
GRANT SELECT ANY TABLE TO username;
GRANT YSTREAM_CAPTURE TO username;
# 部署debezium connector yashandb
下载debezium相关依赖并存放至
plugins/debezium-connector-yashandb
目录,下载地址为https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.4.2.Final/debezium-connector-oracle-2.4.2.Final-plugin.tar.gz (opens new window)。删除上述依赖包中的debezium-connector-oracle-2.4.2.Final.jar。
将如下文件放入目录plugins/debezium-connector-yashandb中。
- debezium-connector-yashandb-2.4.2.0.jar
- YashanDB jdbc依赖
- YashanDB YStream依赖
将绝对路径
/kafkahome/plugin/debezium-connector-yashandb
添加到Kafka connect的配置文件的plugin.path中。重启Kafka Connect进程。
bin/connect-distributed.sh config/connect-distributed.properties
Copied!配置debezium YashanDB connector,如下:
{ "name": "yashandb-connector", "config": { "connector.class" : "io.debezium.connector.yashandb.YashanDBConnector", "database.hostname" : "<YashanDB_IP_ADDRESS>", "database.port" : "1688", "database.user" : "SYS", "database.password" : "Cod-2022", "database.dbname" : "TEST", "topic.prefix" : "my_topic", "database.url" : "jdbc:yasdb://<YashanDB_IP_ADDRESS>:1688/SYS", "table.include.list" : "TEST.TAB01", "database.ystream.server.name" : "server1", "lob.enabled" : "true", "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory" } }
Copied!启动任务。
# 连接器参数
本文仅列示部分关键参数,更多详情请参考Debezium Connector for Oracle官方文档 (opens new window)。
参数名 | 默认值 | 参数说明 |
---|---|---|
name | (none) | 连接器的唯一名称,必选参数,要求全局唯一 |
connector.class | (none) | 连接器的Java类的名称,固定为io.debezium.connector.oracle |
database.hostname | (none) | YashanDB数据库服务的IP地址或hostname |
database.port | (none) | YashanDB数据库服务的端口 |
database.user | (none) | 连接YashanDB数据库的用户名 |
database.password | (none) | 连接YashanDB数据库的用户名的密码 |
database.dbname | (none) | YashanDB数据库的名称 |
database.url | (none) | YashanDB数据库的JDBC URL |
database.ystream.server.name | (none) | Ystream服务名称,要求全局唯一,连接器会根据该名称自动创建相应的Ystream服务 |
ystream.blocking.queue.size | 128 | YStream客户端内置阻塞队列的长度,获取增量逻辑日志时直接从该队列获取 |
ystream.poll.timeout | 10 | 从阻塞队列中获取下一个结果的超时时间(单位:秒) |
ystream.client.response.timeout | 60 | YStream服务端等待YStream客户端响应的最长时间(单位:秒) |
topic.prefix | (none) | 主题前缀,用于为连接器从中捕获更改的Oracle数据库服务器提供命名空间。该参数值将用作连接器发出的所有Kafka主题名称的前缀,要求全局唯一,由字母、数字、连字符、点和下划线组成。 连接器无法恢复其数据库架构的历史主题,一旦更改该值并重新启动,连接器将会向新主题发出后续事件,请不要轻易更改该参数值 |
snapshot.mode | initial | 连接器对捕获的表进行快照的模式 * always:连接器每次启动时始终执行快照(表结构和数据),快照完成后连接器开始捕获并记录目标表发生的表结构和数据更改 * initial:连接器首次启动时执行快照(表结构和数据),快照完成后连接器开始捕获并记录目标表发生的表结构和数据更改,后续启动时不会再次执行快照 * initial_only:连接器首次启动时执行快照(表结构和数据),在目标表发生连接器启动后的首次更改时中止快照,且连接器不处理目标表发生的任何后续更改 * schema_only:连接器每次启动时始终执行快照(仅含表结构),快照完成后连接器开始捕获并记录目标表发生的表结构更改 * schema_only_recovery:基于schema_only模式的恢复模式,可用于连接器意外断连后再次重启时,连接器启动后会执行快照恢复损坏或丢失的历史主题,照完成后,连接器的表现同schema_only模式。仅在连接器上一次意外断连时间点至快照时间点期间未发生表结构更改的情况下,才能安全使用此模式。您也可以按需定期设置该值清理因意外断连而增长的历史主题 更多详情请查阅debezium官方文档 (opens new window) |
schema.include.list | (none) | 需捕获变更的schema清单,可选参数,清单采用正则表达式,多个schema名称间用逗号, 分隔,若配置该参数,连接器将只捕获清单中包含的schema相关变更 通常该参数与schema.exclude.list参数择一配置即可,且schema.include.list优先级更高(即配置了schema.include.list后schema.exclude.list将失效),若二者均不配置,则默认捕获所有非系统schema的更改 |
schema.exclude.list | (none) | 无需捕获变更的schema清单,可选参数,清单采用正则表达式,多个schema名称间用逗号, 分隔,若配置该参数,连接器将捕获除清单中包含的schema外的其他非系统schema的相关变更 通常该参数与schema.include.list参数择一配置即可,且schema.include.list优先级更高(即配置了schema.include.list后schema.exclude.list将失效),若二者均不配置,则默认捕获所有非系统schema的更改 |
table.include.list | (none) | 需捕获变更的表清单,可选参数,清单采用正则表达式,多个表名间用逗号, 分隔,若配置该参数,连接器将只捕获清单中包含的表相关变更 通常该参数与table.exclude.list参数择一配置即可,且table.include.list优先级更高(即配置了table.include.list后table.exclude.list将失效),若二者均不配置,则默认捕获所有非系统表的更改 |
table.exclude.list | (none) | 无需捕获变更的表清单,可选参数,清单采用正则表达式,多个表名间用逗号, 分隔,若配置该参数,连接器将捕获除清单中包含的表外的其他非系统表的相关变更 通常该参数与table.include.list参数择一配置即可,且table.include.list优先级更高(即配置了table.include.list后table.exclude.list将失效),若二者均不配置,则默认捕获所有非系统表的更改 |
max.batch.size | 2048 | 连接器每次迭代期间要处理的单批事件的最大大小,正整数值 |
max.queue.size | 9182 | 阻塞队列可以容纳的最大记录数,正整数值。当Debezium读取数据库中的事件流时,它会在将事件写入Kafka之前将其放置在阻塞队列中。在连接器接收消息的速度快于将消息写入Kafka的速度的情况下,或当Kafka不可用时,阻塞队列可以为从数据库读取更改事件提供背压。当连接器定期记录偏移量时,队列中保存的事件将被忽略 需将max.queue.size的值设置为大于max.batch.size的数值 |
max.queue.size.in.bytes | 0(disabled) | 阻塞队列的最大容量(单位:字节),长整数值。默认情况下,不会为阻塞队列指定卷限制。如需指定队列可以消耗的字节数,请将此属性设置为正长值 若同时设置了max.queue.size,则当队列大小达到任一属性指定的限制时将阻止对队列的写入。例如max.queue.size=1000且max.queue.size.in.bytes=5000,在队列包含1000条记录或队列中的记录量达到5000字节后,将阻止向队列写入 |
poll.interval.ms | 500 (0.5 second) | 连接器在每次迭代期间应等待新更改事件出现的时间(单位:毫秒),正整数值 |
snapshot.fetch.size | 10000 | 在snapshot快照时从每个表一次读取的最大行数,连接器将以该参数指定大小分批多次读取表内容 |
query.fetch.size | 10000 | JDBC查询的fetch size |
lob.enabled | false | 控制是否在更改事件中发送大对象(CLOB或BLOB等)列值。默认情况下,更改事件中大对象列不包含列值。如需捕获大型对象值并在更改事件中对其进行序列化(会有一定的开销),请将该参数设置为true |
decimal.handling.mode | precise | 连接器处理NUMBER、DECIMAL和NUMERIC列的浮点值的模式 * precise:使用java.math精确表示值,以二进制形式在更改事件中表示的BigDecimal值。该模式对负scale的浮点值支持程度有限,建议使用string模式 * double:使用双精度值表示值,该模式可能会导致精度损失 * string:将值编码为格式化字符串,该模式可能会导致有关真实类型的语义信息丢失 |
unavailable.value.placeholder | __debezium_unavailable_value | 连接器用于标识未从数据库中获取到真实数据值但该值未发生更改的常数代值,例如虽未从数据库获取到某一LOB值但明确该LOB值未发生更改就使用unavailable.value.placeholder参数值代替该LOB值 |
skipped.operations | t | 连接器在流式传输过程中需跳过的操作清单,多个操作名称间用逗号, 分隔可以配置的操作名包括: * c:表示插入/创建操作 * u:表示更新操作 * d:表示删除操作 * t:表示截断操作,默认情况下,只跳过截断操作 |
signal.data.collection | (none) value | 用于向连接器发送信号的数据采集的完全限定名称,格式为\<databaseName>.\<schemaName>.\<tableName> |
signal.enabled.channels | source | 连接器可用的信号通道名称清单。默认情况下,可用通道包括source、kafka、file、jmx |
notification.enabled.channels | (none) | 连接器可用的通知通道名称清单。默认情况下,可用通道包括sink、log、jmx |
incremental.snapshot.chunk.size | 1024 | 在增量快照块期间,连接器获取并读入内存的最大行数 |
topic.naming.strategy | io.debezium.schema.SchemaTopicNamingStrategy | 用于确定数据更改、模式更改、事务、心跳事件等的主题名称的类名,默认为SchemaTopicNamingStrategy |
topic.delimiter | . | 主题名称的分隔符,默认为. |
snapshot.max.threads | 1 | 连接器在执行初始快照时使用的线程数。如需启用并行初始快照,请将属性设置为大于1的值 |
# 数据类型映射
当Debezium Oracle连接器检测到表行的值发生更改时,会发出表示该更改的更改事件。每个更改事件记录的结构与原始表的结构相同,事件记录包含每个列值的字段。表列的数据类型决定了连接器如何在更改事件字段中表示列的值,如以下各节中的表所示。
对于表中的每一列,Debezium将源数据类型映射到literal文本类型。
文本类型
使用以下Kafka Connect架构类型之一描述如何按字面表示值:INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP、STRUCT。
YashanDB data type | Literal type (schema type) |
---|---|
CHAR[(M)] | STRING |
NCHAR[(M)] | STRING |
VARCHAR[(M)] | STRING |
NVARCHAR[(M)] | STRING |
BLOB | BYTES |
CLOB | STRING |
NCLOB | STRING |
RAW | BYTES |
TINYINT | INT8 |
SMALLINT | INT16 |
INT | INT32 |
BIGINT | INT64 |
FLOAT | FLOAT32 |
DOUBLE | FLOAT64 |
NUMBER | BYTES / INT8 / INT16 / INT32 / INT64 |
BIT(1) | BOOLEAN |
BIT(n) | BYTES |
BOOLEAN | BOOLEAN |
DATE | INT64 |
TIME | INT64 |
TIMESTAMP | INT64 |
INTERVAL YEAR TO MOUTH | FLOAT64 |
INTERVAL DAY TO SECOND | FLOAT64 |
ROWID | STRING |
UROWID | STRING |
# 常见问题
# Q1. 报错“YashanDB does not yet have the YStream server ‘serverxx’ or check option 'database.ystream.server.name' if the parameters are filled in correctly. Please create and configure the YStream server, refer to the link 'xxx'.”该如何处理?
该报错表示database.ystream.server.name
参数值对应的YStream服务不存在,请先调用DBMS_YSTREAM_ADM高级包对应函数创建YStream服务并完成相关配置。
# Q2. 报错:YashanDB YStream server status is xxx. Please execute 'DBMS_YSTREAM_ADM.START( server_name IN VARCHAR(64) );' start YStream server。
该报错表示database.ystream.server.name
参数值对应的YStream服务处于非运行状态或者非启动状态,请先在数据库中调用DBMS_YSTREAM_ADM.START函数启动该YStream服务:
exec DBMS_YSTREAM_ADM.START('YStream服务名');
# Q3. Decimal数值同步到Kafka后,为什么序列化出来数据出错?
debezium会将负scale的Decimal进行特殊处理,建议使用参数decimal.handling.mode
=string来规避。